package com.bodystm.server;

import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.ArrayUtils;
import org.apache.log4j.Logger;

import com.bodystm.algorithm.BreathRate;
import com.bodystm.algorithm.DllInterface;
import com.bodystm.algorithm.MathTools;
import com.bodystm.bean.Bed;
import com.bodystm.bean.Ecg;
import com.bodystm.bean.datadeal.BreathRateAnalysis;
import com.bodystm.bean.datadeal.EcgAnalysis;
import com.bodystm.bean.datadeal.OximetryAnalysis;
import com.bodystm.config.PublicSetting;
import com.bodystm.util.MathUtils;
import com.bodystm.web.DeviceSettings;

import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import redis.clients.jedis.Jedis;
/**
 * 消费者使用ScheduleMQ接收数据
 * ScheduleMQ mq = new ScheduleMQ();  //也就是consumer
       mq.start();
 * @author ehl
 *
 */
public class ScheduleMQ4RESP extends Thread {
	Logger logger=Logger.getLogger(ScheduleMQ4RESP.class);
	private byte[] mac;
	private String strMac;
	private byte[] rediskey;
	private Ecg ecg=null;
	private BreathRateAnalysis breathRateAnalysis=new BreathRateAnalysis();
	//===============算法改为调用dll的形式=====================
	//private long breath=DllInterface.INSTANCE
	private long breath=DllInterface.INSTANCE.Breath_C();
	//===================================================
//	private MathTools mathTools=new MathTools();
	private long mathTools=DllInterface.INSTANCE.Resampling_C();
	private Bed bed=null;
	public ScheduleMQ4RESP(byte[] data){
		this.mac=ArrayUtils.subarray(data,  0, 6);
		this.strMac=ByteUtil.convertByteToString(mac);
		this.rediskey=data;
	}
	@Override
	public void run(){
		//==========================
		//建立websocket服务，等待客户端连接
		
		//==========================
		Jedis jedis=null;
		bed = DeviceSettings.hasBeds.get(strMac);
    	if (null!=bed&&!bed.getStatus().equals(PublicSetting.BED_STATUS_EMPTY)) {
    		ecg=bed.getEcg();
			if (ecg==null) {
				ecg=new Ecg();
				bed.setEcg(ecg);
			}
		}
		/*for (Bed bed : DeviceSettings.lstBeds) {
			if (!bed.getStatus().equals(PublicSetting.BED_STATUS_EMPTY)&&bed.getConcentratorNo().equals(strMac)) {
				ecg=bed.getEcg();
				break;
			}
		}*/
		try {
			jedis = RedisManager.getJedis();  
			//组装发送到前台的数据
			StringBuilder sbResultData=null;
			while(!isInterrupted()) {
			    if (jedis==null) {
			    	logger.error("redis服务连接失败！");
					break;
				}
			    ecg=null;//为了防止后台修改床位mac时，哈希表中bed变为新的，这里还在更新旧的bed对象
			    if (ecg==null) {
			    	bed = DeviceSettings.hasBeds.get(strMac);
			    	if (null!=bed&&!bed.getStatus().equals(PublicSetting.BED_STATUS_EMPTY)) {
			    		ecg=bed.getEcg();
			    		if (ecg==null) {
			    			ecg=new Ecg();
			    			bed.setEcg(ecg);
			    		}
			    	}
//					continue;
				}
			    //阻塞式brpop，List中无数据时阻塞  
			    //参数0表示一直阻塞下去，直到List出现数据  
			    List<byte[]> list = jedis.brpop(0, rediskey);  
			    
			    Channel channel =null;
			    ArrayList<Channel> channels=null;
			    if (WebSocketManager.wsServer!=null) {
//			    	channel = WebSocketManager.wsServer.channelsMap4RESP.get(strMac);
			    	channels = WebSocketManager.wsServer.channelsMap4RESP2.get(strMac);
			    }
//		        if (channel!=null&&channel.isOpen()) {
		        	for(byte[] bs : list) {  
		        		//忽略返回的列表中的key值
		        		if (bs.length<10) {
							continue;
						}
		        		sbResultData=new StringBuilder();
		        		sbResultData.append("/"+strMac+"/breath/");
		        		for(int i=0;i<BreathRateAnalysis.BR_POINTS_PER_PACK;i++){
		        			float value=Float.parseFloat(MathUtils.binary(ArrayUtils.subarray(bs, 8+3*i, 11+3*i), 10))-0x7f0000+0x7f;//-0x7F0000;
		        			breathRateAnalysis.dataInput[breathRateAnalysis.curNum++]=value;
		        			if (breathRateAnalysis.curNum==BreathRateAnalysis.length4OneCal) {
//		        				int resp=Math.round(BreathRate.Breath(breathRateAnalysis.dataInput, BreathRateAnalysis.length4OneCal));
		        				int resp=Math.round(DllInterface.INSTANCE.Breath_B(breathRateAnalysis.dataInput, BreathRateAnalysis.length4OneCal,breath));
		        				if (null!=ecg&&ecg.getResp()!=resp) {
		        					ecg.setResp(resp);
		        					/*if (channel!=null&&channel.isOpen()) {
		        						channel.writeAndFlush(new TextWebSocketFrame("/"+strMac+"/resp/"+resp));
		        					}*/
		        				}
		        				breathRateAnalysis.curNum=0;
		        				if (bed!=null) {
									bed.setRespUpdateMs(System.currentTimeMillis());
								}
		        				//解析数据加入缓存
			        			RedisManager.set("resp", String.valueOf(resp));
							}
		        			
		        			//重采样相关
		        			if (channels!=null&&channels.size()>0) {//channel!=null&&channel.isOpen()
//		        				double dResampleRet[]=new double[8];
//		        				int nResamplingRet=mathTools.Resampling((float)1.0/breathRateAnalysis.SAMPLE_RATE, (float)1.0/breathRateAnalysis.npTargetSampleRate,dResampleRet,(double)value,0);//0-->BodystmProduct.BodystmSpO2.ordinal()
		        				int nResamplingRet=DllInterface.INSTANCE.Resampling_B((float)1.0/breathRateAnalysis.SAMPLE_RATE, (float)1.0/breathRateAnalysis.npTargetSampleRate, value, 0, mathTools);
		        				for (int j = 0; j <= nResamplingRet; j++)
		        				{
		        					//channel.writeAndFlush(new TextWebSocketFrame("/"+strMac+"/breath/"+value));
		        					sbResultData.append(value);sbResultData.append(",");
		        				}
		        			}
		        			
		        		}
		        		/*if (channel!=null&&channel.isOpen()) {
				        	sbResultData.deleteCharAt(sbResultData.length()-1);
				        	channel.writeAndFlush(new TextWebSocketFrame(sbResultData.toString()));
				        }*/
		        		if (channels!=null&&channels.size()>0) {
		        			sbResultData.deleteCharAt(sbResultData.length()-1);
				        	String msg = sbResultData.toString();
							for(Channel ch:channels){
								if (ch!=null&&ch.isOpen()) {
									ch.writeAndFlush(new TextWebSocketFrame(msg));
							        
							    }
							}
						}
				    } 
		        	//channel.flush();
//				}
			     
  
			}
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			// Thread.sleep()方法由于中断抛出异常。  
            // Java虚拟机会先将该线程的中断标识位清除，然后抛出InterruptedException，  
            // 因为在发生InterruptedException异常的时候，会清除中断标记  
            // 如果不加处理，那么下一次循环开始的时候，就无法捕获这个异常。  
            // 故在异常处理中，再次设置中断标记位  
            Thread.currentThread().interrupt();
		} finally {
			//jedis.close();  
			RedisManager.close(jedis);
			DllInterface.INSTANCE.Resampling_D(mathTools);
			DllInterface.INSTANCE.Breath_D(breath);
		}
	}
}
